/**
 *    Copyright (C) 2025-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#pragma once

#include "mongo/base/string_data.h"
#include "mongo/db/exec/agg/stage.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/value_comparator.h"
#include "mongo/db/memory_tracking/memory_usage_tracker.h"
#include "mongo/db/pipeline/document_source_densify.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/query/stage_memory_limit_knobs/knobs.h"
#include "mongo/util/modules.h"

#include <cstddef>
#include <list>
#include <utility>

#include <boost/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>

namespace mongo {
namespace exec {
namespace agg {

class InternalDensifyStage final : public Stage {
public:
    class DocGenerator {
    public:
        DocGenerator(DensifyValue current,
                     RangeStatement range,
                     FieldPath fieldName,
                     boost::optional<Document> includeFields,
                     boost::optional<Document> finalDoc,
                     boost::optional<Value> partitionKey,
                     ValueComparator comp,
                     size_t* counter,
                     bool maxInclusive);
        Document getNextDocument();
        bool done() const;
        // Helper to return whether this is the last generated document. This is useful because
        // the last generated document is always on the step. Expected to be called after generating
        // a document to describe the document that was just generated as 'last' or 'not last'.
        bool lastGeneratedDoc() const;

    private:
        ValueComparator _comp;
        RangeStatement _range;
        // The field to add to 'includeFields' to generate a document.
        FieldPath _path;
        Document _includeFields;
        // The document that is equal to or larger than the upper bound that prompted the
        // creation of this generator. Will be returned after the final generated document. Can
        // be boost::none if we are generating the values at the end of the range.
        boost::optional<Document> _finalDoc;
        // The partition key for all documents created by this generator.
        boost::optional<Value> _partitionKey;
        // The minimum value that this generator will create, therefore the next generated
        // document will have this value.
        DensifyValue _min;
        bool _maxInclusive = false;

        enum class GeneratorState {
            // Generating documents between '_min' and the upper bound.
            kGeneratingDocuments,
            // Generated all necessary documents, waiting for a final 'getNextDocument()' call.
            kReturningFinalDocument,
            kDone,
        };

        GeneratorState _state = GeneratorState::kGeneratingDocuments;
        // Value to increment when returning a generated document. This is a pointer to the counter
        // that keeps track of the total number of documents generated by the owning stage across
        // all generators.
        size_t* _counter;
    };

    InternalDensifyStage(StringData stageName,
                         const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
                         FieldPath field,
                         std::list<FieldPath> partitions,
                         RangeStatement range)
        : Stage(stageName, pExpCtx),
          _field(std::move(field)),
          _memTracker(loadMemoryLimit(StageMemoryLimit::DocumentSourceDensifyMaxMemoryBytes)),
          _partitions(std::move(partitions)),
          _range(std::move(range)),
          _partitionTable(pExpCtx->getValueComparator()
                              .makeUnorderedValueMap<SimpleMemoryUsageTokenWith<DensifyValue>>()) {
        _maxDocs = internalQueryMaxAllowedDensifyDocs.load();
    }

private:
    enum class ValComparedToRange {
        kBelow,
        kRangeMin,
        kInside,
        kAbove,
    };

    GetNextResult doGetNext() final;

    /**
     * Decides whether or not to build a DocGen and return the first document generated or return
     * the current doc if the rangeMin + step is greater than rangeMax.
     * Optionally include the densify and partition values for the generator if they've already
     * been calculated.
     */
    GetNextResult handleNeedGen(Document currentDoc,
                                DensifyValue lastSeen,
                                DensifyValue& densifyVal,
                                Value& partitinKey);

    /**
     * Check whether or not the first document in the partition needs to be densified. Returns the
     * document this iteration should return.
     */
    GetNextResult checkFirstDocAgainstRangeStart(Document doc,
                                                 DensifyValue& densifyVal,
                                                 Value& partitionKey);

    /**
     * Handles when the pSource has been exhausted. Has different behavior depending on the densify
     * mode, but generally speaking sets the min/max for what still needs to be done, and then
     * delegates to helpers to finish densifying each partition individually. In the non-partitioned
     * case, this is the "trivial" partition of the whole collection.
     */
    GetNextResult handleSourceExhausted();

    /**
     * Handles building a document generator once we've seen an EOF for partitioned input. Min will
     * be the last seen value in the partition unless it is less than the optional 'minOverride'.
     * Helper is to share code between visit functions.
     */
    GetNextResult finishDensifyingPartitionedInputHelper(
        DensifyValue max,
        boost::optional<DensifyValue> minOverride = boost::none,
        bool maxInclusive = false);

    /**
     * Create a document generator for the given range statement. The generation will start at 'min'
     * (inclusive) and will go to the end of the given 'range'. Whether or not a document at the
     * range maximum depends on 'maxInclusive' -- if true, the range will be inclusive on both ends.
     * Will output documents that include any given 'includeFields' (with their values) and, if
     * given, will output 'finalDoc' unchanged at the end of the generated documents.
     * Pass the partition key to the generator to avoid having to compute it for each document
     * the generator creates.
     */
    void createDocGenerator(DensifyValue min,
                            RangeStatement range,
                            boost::optional<Document> includeFields,
                            boost::optional<Document> finalDoc,
                            boost::optional<Value> partitionKey,
                            bool maxInclusive = false) {
        _docGenerator = DocGenerator(min,
                                     range,
                                     _field,
                                     includeFields,
                                     finalDoc,
                                     partitionKey,
                                     pExpCtx->getValueComparator(),
                                     &_docsGenerated,
                                     maxInclusive);
    }
    void createDocGenerator(DensifyValue min, RangeStatement range) {
        createDocGenerator(min, range, boost::none, boost::none, boost::none);
    }

    boost::optional<Document> createIncludeFieldsObj(Value partitionKey);

    /**
     * Set up necessary tracking variables based on the densify mode. Only called once at the
     * beginning of execution.
     */
    void initializeState();

    /**
     * Helper to pull a document from the previous stage and verify that it is eligible for
     * densification. Returns a tuple of <Return immediately, pulled document, value to be
     * densified, partition key>.
     */
    std::tuple<bool, GetNextResult, DensifyValue, Value> getAndCheckInvalidDoc();

    Value getDensifyPartition(const Document& doc) {
        auto part = _partitionExpr->evaluate(doc, &getContext()->variables);
        return part;
    }

    /**
     * Helper to set the value in the partition table.
     */
    void setPartitionValue(DensifyValue partitionVal, Value partitionKey) {
        SimpleMemoryUsageToken memoryToken{
            partitionKey.getApproximateSize() + partitionVal.getApproximateSize(), &_memTracker};
        _partitionTable[partitionKey] = {std::move(memoryToken), std::move(partitionVal)};
        uassert(6007200,
                str::stream() << "$densify exceeded memory limit of "
                              << _memTracker.maxAllowedMemoryUsageBytes(),
                _memTracker.withinMemoryLimit());
    }

    void setPartitionValue(Document doc,
                           boost::optional<DensifyValue> valueOverride = boost::none) {
        tassert(8246103, "partitionExpr", _partitionExpr);
        auto partitionKey = getDensifyPartition(doc);
        auto partitionVal = valueOverride ? *valueOverride : getDensifyValue(doc);
        setPartitionValue(partitionVal, partitionKey);
    }

    DensifyValue getDensifyValue(const Document& doc) {
        auto val = DensifyValue::getFromDocument(doc, _field);
        assertDensifyType(val);
        return val;
    }

    DensifyValue getDensifyValue(const Value& val) {
        auto densifyVal = DensifyValue::getFromValue(val);
        assertDensifyType(densifyVal);
        return densifyVal;
    }

    void assertDensifyType(DensifyValue val) {
        uassert(6053600,
                val.isNumber()
                    ? "Encountered numeric densify value in collection when step has a date unit."
                    : "Encountered date densify value in collection when step does not have a date "
                      "unit.",
                (!_range.getUnit() && val.isNumber()) || (_range.getUnit() && val.isDate()));
    }

    boost::optional<DocGenerator> _docGenerator = boost::none;

    // The field on which we are densifying.
    FieldPath _field;

    // _partitionExpr has two purposes:
    // 1. to determine which partition a document belongs in.
    // 2. to initialize new documents with the right partition key.
    // For example, if the stage had 'partitionByFields: ["a", "x.y"]' then this expression
    // would be {a: "$a", {x: {y: "$x.y"}}}.
    // In the non-partitioned case, this is set to be a constant expression "true".
    boost::intrusive_ptr<Expression> _partitionExpr;

    enum class DensifyState {
        kUninitialized,
        kNeedGen,
        kHaveGenerator,
        kFinishingDensifyNoGenerator,
        kFinishingDensifyWithGenerator,
        kDensifyDone
    };

    // Keep track of documents generated, error if it goes above the limit.
    size_t _docsGenerated = 0;
    size_t _maxDocs = 0;
    SimpleMemoryUsageTracker _memTracker;

    DensifyState _densifyState = DensifyState::kUninitialized;

    // Used to keep track of the bounds for densification.
    // Track the minimum seen across the input set. Should be the first document seen.
    boost::optional<DensifyValue> _fullDensifyGlobalMin = boost::none;
    // Track the maximum seen across the input set. Should be the last document seen.
    boost::optional<DensifyValue> _fullDensifyGlobalMax = boost::none;
    bool _isFullDensify = false;

    // Value to store the beginning/end of the densification range. Note that these may also be
    // stored in the range object we had parsed initially, but we store them here again for easier
    // access and to avoid using parsing objects during execution.
    boost::optional<DensifyValue> _rangeDensifyStart = boost::none;
    boost::optional<DensifyValue> _rangeDensifyEnd = boost::none;

    // The list of partitions we are using to densify taken from the original stage object.
    std::list<FieldPath> _partitions;

    // Range statement taken from the original stage object.
    RangeStatement _range;

    // Store of the value we've seen for each partition. In the non-partitioned case should only
    // have one key -- "true". This allows us to pretend that all input is partitioned and use the
    // same codepath.
    ValueUnorderedMap<SimpleMemoryUsageTokenWith<DensifyValue>> _partitionTable;
};

}  // namespace agg
}  // namespace exec
}  // namespace mongo
